iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 19
0
Software Development

用30天介紹 open source 專案 Ohara 系列 第 19

Day 19 實作 Ohara 的 Source Connector 介面

  • 分享至 

  • xImage
  •  

Ohara 官方有提供了一些 Source Connector,像是 JDBC Source Connector、Ftp Source Connector、Perf Source Connector…等等的 Connector,如果使用者另外有其它需求也可以自已來寫程式實作 Ohara 的 Connector 介面,這樣的好處是寫完 connector 的程式並且打包成 jar 檔之後,就可以透過 ohara manager 的 UI 介面上傳到 ohara 的 Worker Cluster, 之後就可以透過 UI 的方式拉出自已實作的 Connector 程式並且執行 Pipeline。

如果是實作 Kafka 提供的 connector 介面,要自已手動做非常多的事,像是把 jar 檔佈署到 worker cluster 裡、設定 plugin dir 參數、使用 Kafka Worker 提供的 Restful API 來啟動自已實作的 connector…等等的工作。Ohara 的目標就是要讓開發者能更關注在connector 程式,處理資料邏輯的部份。

今天主要會介紹要如何實作 Ohara 提供的 Connector 介面,來實作亂數產生數字的 Source Connector,這個 Connector 名稱我就把它命名成 RandomNumberSourceConnecotr。

在實作程式之前需要設定 build.gradle 的檔案,如下:

apply plugin: 'scala' 
sourceCompatibility = 1.8 

repositories { 
  mavenCentral() 
  maven { 
    url "https://dl.bintray.com/oharastream/ohara" 
  } 
} 
 
tasks.withType(JavaCompile){ 
  options.encoding = 'UTF-8' 
} 
 
tasks.withType(Javadoc){ 
  options.encoding = 'UTF-8' 
} 
 
dependencies { 
  compile 'org.scala-lang:scala-library:2.12.9' 
  compile 'junit:junit:4.12' 
  compile 'com.island.ohara:ohara-client:0.7.1' 
  compile 'com.island.ohara:ohara-common:0.7.1' 

compile 'com.island.ohara:ohara-kafka:0.7.1' 
} 

以上的 build.gradle 檔案,主要會匯入要實作 ohara connector 介面的 jar 檔

RandomNumberSourceConnecotr.scala

class RandomNumberSourceConnecotr extends RowSourceConnector { 
  private[perf] var settings: TaskSetting = _ 
  override def _taskClass(): Class[_ <: RowSourceTask] = classOf[RandomNumberTask] 
 
  override def _taskSettings(maxTasks: Int): util.List[TaskSetting] = Seq.fill(maxTasks)(settings).asJava 
 
  override def _start(config: TaskSetting): Unit = { 
    this.settings = settings 
  } 
 
  override def _stop(): Unit = { 
     //Nothing 
  } 
 
  override protected def _definitions():  java.util.List[SettingDef] = Seq().asJava 
} 

以上的程式碼主要是 Connector 程式碼的進入點,它會先繼承 RowSourceConnector Ohara 定義的 Connector 介面,背後會去包裝 Kafka 的 Connector 介面, 之後實作 _taskClass、_taskSettings、_start、_definitions 方法,在這裡值得一提的是 _definitions 方法,它的做用主要是定義了有哪些設定的參數,並且設定給 connector 使用,如果有定義 _definitions 這樣 UI 才能自動的載入,實作 Connector 的設定參數畫面。

RandomNumberTask.scala

class RandomNumberTask extends RowSourceTask { 
  private[perf] var schema: Seq[Column] = _ 
  private[this] var topics: Seq[String] = _ 
 
  override def _start(settings: TaskSetting): Unit = { 
    this.topics = settings.topicNames().asScala 
    this.schema = settings.columns.asScala 
   } 
 
   override def _stop(): Unit = { 
 
    } 
 
    override def _poll(): util.List[RowSourceRecord] = { 
      Thread.sleep(5000) 
      val value = CommonUtils.randomString() 
 
      val row: Row = Row.of( 
      schema.sortBy(_.order).map { c => 
         Cell.of( 
           c.name, 
           c.dataType match { 
           case DataType.BOOLEAN => false 
           case DataType.BYTE => ByteUtils.toBytes(value).head 
           case DataType.BYTES => ByteUtils.toBytes(value) 
           case DataType.SHORT => value.toShort 
           case DataType.INT => value.toInt 
           case DataType.LONG => value 
           case DataType.FLOAT => value.toFloat 
          case DataType.DOUBLE => value.toDouble 
          case DataType.STRING => value.toString 
          case _ => value 
        } 
      ) 
    }: _* 
  ) 
  val records: Seq[RowSourceRecord] = topics.map(RowSourceRecord.builder().row(row).topicName(_).build()) 
     records.toList.asJava 
  } 
} 

以上的程式主要是定義在執行 connector 下的 task,在這裡的程式主要會定期的將亂數資料寫入到 Kafka Topic,另外 Ohara 也定義 Row 來當作回傳資料的格式類型,Row 下面定義了 DataType,因此 Ohara 的 Connector 程式資料有 Metadata 的概念。

今天已經簡單的介紹要如何撰寫 Ohara 的 source connector,在這裡只是簡單的情境,在實際的情況還要考慮很多東西,像是資料 offset 的管理、task reblance 時要做的處理…等等的議題,如果有興趣也可以參考以下的連結,來了解 connector 的實作。

https://ohara.readthedocs.io/en/latest/custom_connector.html

今天寫的程式還沒有做測試,因此明天還會繼續介紹有關於 connector 在 Ohara 測試的部份。

以下是今天實作的 Source Connector 連結:
https://github.com/jackyoh/ohara-connector-demo

以上的文章經過了 Day 20 的測試程式,發現以上的程式是有 bug。正確的程式碼已經在以下的連結修正了:
https://github.com/jackyoh/ohara-connector-demo


上一篇
Day 18 Build Ohara 的 Docker Image
下一篇
Day 20 撰寫 Source Connector 測試程式
系列文
用30天介紹 open source 專案 Ohara 30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言